import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.ExecutionEnvironment;

public class CoGrouponDataSets {

//    如果两个数据集的key0相等,那么就把他们的key1进行排列组合相乘作为结果
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String rootPath = System.getProperty("user.dir");
        DataSet<Tuple2<String,Double>>  dVals =env.readCsvFile("file://"+rootPath+"/"+"flatjoin.csv").types(String.class, Double.class);
        DataSet<Tuple2<String,Integer>> iVals =env.readCsvFile("file://"+rootPath+"/"+    "cog1.csv").types(String.class, Integer.class);

            DataSet<Double> output = iVals.coGroup(dVals)
                    // group first DataSet on first tuple field
                    .where(0)
                    // group second DataSet on first tuple field
                    .equalTo(0)
                    // apply CoGroup function on each pair of groups
                    .with(new MyCoGrouper());

       output.print();
    }

}
